Skip to content

Conversation

@nuno-faria
Copy link
Contributor

Which issue does this PR close?

N/A.

Rationale for this change

Upgrade to the latest version of datafusion.

What changes are included in this PR?

Fixed breaking changes.

Are there any user-facing changes?

No.

Comment on lines +898 to +901
"List(nullable Int64)",
"List(nullable Int64)",
"List(nullable Int64)",
"List(nullable Int64)",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lists have a new render format: apache/arrow-rs#8290

(
f.regexp_replace(column("a"), literal("(ell|orl)"), literal("-")),
pa.array(["H-o", "W-d", "!"]),
pa.array(["H-o", "W-d", "!"], type=pa.string_view()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regexp_replace now uses UTF8View: apache/datafusion#17195

impl PyCatalog {
#[new]
fn new(catalog: PyObject) -> Self {
fn new(catalog: Py<PyAny>) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)))?;

Python::with_gil(|py| {
Python::attach(|py| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let streams = spawn_future(py, async move { df.execute_stream_partitioned().await })?;

let mut schema: Schema = self.df.schema().to_owned().into();
let mut schema: Schema = self.df.schema().to_owned().as_arrow().clone();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[allow(clippy::too_many_arguments)]
#[new]
#[pyo3(signature = (schema, name, location, file_type, table_partition_cols, if_not_exists, temporary, order_exprs, unbounded, options, constraints, column_defaults, definition=None))]
#[pyo3(signature = (schema, name, location, file_type, table_partition_cols, if_not_exists, or_replace, temporary, order_exprs, unbounded, options, constraints, column_defaults, definition=None))]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

impl PyPrepare {
#[new]
pub fn new(name: String, data_types: Vec<PyDataType>, input: PyLogicalPlan) -> Self {
pub fn new(name: String, fields: Vec<PyArrowType<Field>>, input: PyLogicalPlan) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prepare now stores fields: apache/datafusion#17986

use datafusion::functions_aggregate::all_default_aggregate_functions;
use datafusion::functions_window::all_default_window_functions;
use datafusion::logical_expr::expr::{Alias, FieldMetadata, WindowFunction, WindowFunctionParams};
use datafusion::logical_expr::sqlparser::ast::NullTreatment as DFNullTreatment;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NullTreatment has been moved to not rely on the sqlparser: apache/datafusion#17332

@timsaucer
Copy link
Member

Thank you for getting started on this! I was hoping we might delay slightly until DF 51.1.0 releases apache/datafusion#18843 Do you think it's okay to wait a bit before release? Either way we can get this PR in and the update the lock.

@nuno-faria
Copy link
Contributor Author

Thank you for getting started on this! I was hoping we might delay slightly until DF 51.1.0 releases apache/datafusion#18843 Do you think it's okay to wait a bit before release? Either way we can get this PR in and the update the lock.

Yeah no problem! Meanwhile it appears that the actions are stuck on the test_arrow_c_stream_interrupted, so it's better to stop the workflow. I will have to take a look at it.

@nuno-faria
Copy link
Contributor Author

I don't know what might be causing the read_all method in the arrow_c_stream_interrupted test to not receive the interrupt. I searched in the datafusion and arrow repos but could not find recent changes that could affect this. @kosiew do you have a clue what might be causing this?

@timsaucer
Copy link
Member

From a little investigation I suspect that the py.check_signals() is no longer running on the main thread. From the documentation "If the function is called from a non-main thread, or under a non-main Python interpreter, it does nothing yet still returns Ok(())."

I wonder if the new py.detach() and py.attach() from 0.25 -> 0.26 is more than simply name change on the deprecation.

@kosiew
Copy link
Contributor

kosiew commented Nov 26, 2025

@nuno-faria
Thanks for the ping.
I am travelling and will not be able to look into this for the rest of the week

@timsaucer
Copy link
Member

Further investigation. This diff

diff --git a/src/utils.rs b/src/utils.rs
index 6cc9626..251eb41 100644
--- a/src/utils.rs
+++ b/src/utils.rs
@@ -84,7 +84,10 @@ where
                 tokio::select! {
                     res = &mut fut => break Ok(res),
                     _ = sleep(INTERVAL_CHECK_SIGNALS) => {
-                        Python::attach(|py| py.check_signals())?;
+                        Python::attach(|py| {
+                            let threading = py.import("threading")?;
+                            let _current_thread = threading.call_method0("current_thread")?;
+                            py.check_signals()})?;
                     }
                 }
             }

Then instead of getting a keyboard interrupt signal now gives a datafusion error wrapping a keyboard interrupt. I don't think it's important that the above code is calling the import or the call_method. I think the bit that's relevant is that the second call takes time within the attached state.

@nuno-faria
Copy link
Contributor Author

I was just looking into this and did a similar change just to check if it is executing in the main thread:

tokio::select! {
    res = &mut fut => break Ok(res),
    _ = sleep(INTERVAL_CHECK_SIGNALS) => {
        Python::attach(|py| {
            let threading = PyModule::import(py, "threading")?;
            let get_ident = threading.getattr("get_ident")?;
            let thread_id: u64 = get_ident.call0()?.extract()?;
            println!("thread_id: {}", thread_id);
            let x = py.check_signals();
            println!("x: {:?}", x);
            x
        })?;
    }
}

Looking at the output, we can see that the thread ids match:

sent signal to 8116 # printed "threading.main_thread().ident" in the python test
thread_id: 8116
x: Ok(())
thread_id: 8116
x: Ok(())
thread_id: 8116
x: Ok(())
thread_id: 8116
x: Ok(())
thread_id: 8116
...

I even delayed the signal sent by the interrupt function just in case but still wasn't able to trigger the exception:

thread_id: 14100
x: Ok(())
thread_id: 14100
x: Ok(())
thread_id: 14100
x: Ok(())
thread_id: 14100
x: Ok(())
sent signal to 14100
thread_id: 14100
x: Ok(())
thread_id: 14100
x: Ok(())
...

However, manually sending CTRL-C results in the exception being returned by the check_signals:

thread_id: 14100
x: Ok(())
thread_id: 14100
x: Ok(())
thread_id: 14100
x: Ok(())
thread_id: 14100
x: Err(PyErr { type: <class 'KeyboardInterrupt'>, value: KeyboardInterrupt(), traceback: None })
PASSED

I also checked if it was somehow an issue with PyThreadState_SetAsyncExc, but the test test_collect_interrupted works as expected. But in this test, the signal is also not being sent to the check_signals:

python/tests/test_dataframe.py::test_collect_interrupted thread_id: 30632
x: Ok(())
thread_id: 30632
x: Ok(())
PASSED # no print of the interrupt

@timsaucer
Copy link
Member

I have a suspicion that what is happening here is that we're uncovering this feature working in pyo3 0.25 but not as intended. I suspect that what is happening is that we're getting a keyboard exception raised in the record batch reader rather than in the wait_for_future. My reasoning is this: If it were our code that is catching the keyboard raised then we should be getting the datafusion wrapped error and not the keyboard exception. By putting in these functions that are holding the gil longer we actually able to catch those signals. I bet that there is some interaction happening in which our tokio runtime attaching to the gil is actually letting the python thread get that keyboard interrupt into the record batch reader somehow. This is more of a feeling than anything I've discovered in the code changes from 0.25 to 0.26.

@nuno-faria
Copy link
Contributor Author

I checked and in the previous version the reader.read_all() method in the test_arrow_c_stream_interrupted never has a chance to call the check_signals method. It executes PartitionedDataFrameStreamReader::next, which in turn calls poll_next_batch, which immediately returns the result, so the tokio::select! exits right away. In the new datafusion version a batch is never generated.

But it is still odd that manually sending an interrupt with CTRL-C works but using PyThreadState_SetAsyncExc does not.

@timsaucer
Copy link
Member

I've done some more testing. I went in and removed the pyarrow feature from our arrow dependency (and then did a nasty hack to comment out all our parts of code that used it) to run the test with DF50 but pyo3 0.26. It passed. Then I bumped DF50 + Arrow 57 and got the same stalling behavior we're finding now.

I do think we've found that the signal catching isn't working exactly like we thought it was. But I also think it's worth taking a look at the changelogs for arrow and datafusion.

@timsaucer
Copy link
Member

I checked and in the previous version the reader.read_all() method in the test_arrow_c_stream_interrupted never has a chance to call the check_signals method. It executes PartitionedDataFrameStreamReader::next, which in turn calls poll_next_batch, which immediately returns the result, so the tokio::select! exits right away. In the new datafusion version a batch is never generated.

Are you sure this isn't during the first call to wait_for_future? We should get one call that returns very fast in execute_stream_partitioned and then another call during PartitionedDataFrameStreamReader::next

@timsaucer
Copy link
Member

I am tempted to take a stab at redoing our runtime and using a CPU Executor similar to this example: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/query_planning/thread_pools.rs

The idea would be to say that all datafusion work gets shipped over to a dedicated threadpool to keep the Python thread lighter, but I am not sure if that's worth it or not. I may be over-analyzing a problem that isn't much of an actual problem. I would like to figure out exactly why this isn't working though.

@nuno-faria
Copy link
Contributor Author

Are you sure this isn't during the first call to wait_for_future? We should get one call that returns very fast in execute_stream_partitioned and then another call during PartitionedDataFrameStreamReader::next

I added a small println!("result: {:?}", result); to PartitionedDataFrameStreamReader::next to confirm, just after receiving the result:

impl Iterator for PartitionedDataFrameStreamReader {
    type Item = Result<RecordBatch, ArrowError>;

    fn next(&mut self) -> Option<Self::Item> {
        while self.current < self.streams.len() {
            let stream = &mut self.streams[self.current];
            let fut = poll_next_batch(stream);
            let result = Python::with_gil(|py| wait_for_future(py, fut));
+           println!("result: {:?}", result);

            match result {

The is the result now in DF51 (also with the previous debug info in wait_for_future), i.e. no result:

python/tests/test_dataframe.py::test_arrow_c_stream_interrupted thread_id: 35716
x: Ok(())
sent signal to 35716
thread_id: 35716
x: Ok(())
thread_id: 35716
x: Ok(())
thread_id: 35716
x: Ok(())
thread_id: 35716
x: Ok(())
thread_id: 35716
x: Ok(())
thread_id: 35716
x: Ok(())
thread_id: 35716
x: Ok(())
thread_id: 35716
x: Ok(())
thread_id: 35716
...

This is the result in DF50 (prints 2992 result: in total, with the last three being result: Ok(Ok(None)), while the others return Some(RecordBatch(...))):

python/tests/test_dataframe.py::test_arrow_c_stream_interrupted result: Ok(Ok(Some(RecordBatch { schema: Schema { fields: [Field { name: "a", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "d", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "a2", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b2", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "e", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "f", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<Int64>
[
  1001,
  1101,
  1201,
  1301,
  1401,
  1501,
  1601,
  1701,
  1801,
  1901,
  ...8172 elements...,
  1517,
  1617,
  1717,
  1817,
  1917,
  6017,
  6117,
  6217,
  6317,
  6417,
], StringArray
[
  "value_1001",
  "value_1101",
  "value_1201",
  "value_1301",
  "value_1401",
  "value_1501",
  "value_1601",
  "value_1701",
  "value_1801",
  "value_1901",
  ...8172 elements...,
  "value_1517",
  "value_1617",
  "value_1717",
  "value_1817",
  "value_1917",
  "value_6017",
  "value_6117",
  "value_6217",
  "value_6317",
  "value_6417",
], PrimitiveArray<Float64>
...
row_count: 8192 })))
result: Ok(Ok(Some(RecordBatch { schema: Schema { fields: [Field { name: "a", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "d", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "a2", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b2", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "e", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "f", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [PrimitiveArray<Int64>
... 
+ 5M lines of output before stopping

I also removed the interrupt signal in the test_arrow_c_stream_interrupted of the previous version and it returns the same number of batches (2992), so the interrupt signal is not really stopping the execution. It is, however, being triggered after the stream ends, since it calls the except KeyboardInterrupt.

nuno-faria and others added 14 commits November 29, 2025 19:36
Updated wait_for_future to surface pending Python exceptions by
executing bytecode during signal checks, ensuring that asynchronous
interrupts are processed promptly. Enhanced PartitionedDataFrameStreamReader
to cancel remaining partition streams on projection errors or Python
interrupts, allowing for clean iteration stops. Added regression tests
to validate interrupted Arrow C stream reads and improve timing
for RecordBatchReader.read_all cancellations.
@kosiew
Copy link
Contributor

kosiew commented Nov 29, 2025

hi @nuno-faria
I found a way to capture KeyboardInterrupt in nuno-faria#1

@nuno-faria
Copy link
Contributor Author

Thanks @kosiew, I merged the PR.

Just a side note in the read_stream function:

def read_stream():
    ...
    try:
        ...
    except KeyboardInterrupt:
        read_exception.append(KeyboardInterrupt)
    except Exception as e:
        read_exception.append(e)

The except KeyboardInterrupt: is not triggered, but rather the except Exception as e:, as it throws an External error: KeyboardInterrupt, which is what I think @timsaucer was expecting here.

@timsaucer
Copy link
Member

I believe I have found the root cause of the change in behavior of the interrupt unit test: apache/datafusion#17452

You can test yourself if you go to main in datafusion-python and try these two patches. These will set datafusion to right before and right after apache/datafusion#17452

failing_test.txt
passing_test.txt

I think this note is relevant from the PR:

    /// This method is async and uses a [`tokio::sync::Barrier`] to wait for all partitions
    /// to report their bounds. Once that occurs, the method will resolve for all callers and the
    /// dynamic filter will be updated exactly once.
    ///
    /// # Note
    ///
    /// As barriers are reusable, it is likely an error to call this method more times than the
    /// total number of partitions - as it can lead to pending futures that never resolve. We rely
    /// on correct usage from the caller rather than imposing additional checks here. If this is a concern,
    /// consider making the resulting future shared so the ready result can be reused.

I haven't looked much at tokio barriers, but it seems like this is somehow interfering with how we are trying to interrupt the task.

I see you have a fix that updates the unit test. That is good, and probably unblocks this PR but I do want to get to the very root of this problem and make sure we have a robust solution.

Tagging @rkrishn7 and @adriangb just so they know we're talking about the work they put in upstream.

nuno-faria and others added 2 commits November 30, 2025 18:31
Co-authored-by: Tim Saucer <timsaucer@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants